package cn.tannn.tregistry.client;

import cn.tannn.tregistry.core.api.Server;
import cn.tannn.tregistry.core.api.Snapshot;
import cn.tannn.tregistry.core.http.OkHttpInvoker;
import cn.tannn.tregistry.core.model.InstanceMeta;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static cn.tannn.tregistry.client.HttpPathConstant.*;
import static cn.tannn.tregistry.core.http.HttpInvoker.log;


/**
 * 注册中心
 *
 * @author <a href="https://tannn.cn/">tan</a>
 * @date 2024/4/23 13:11
 */
public class ServerClient {

    private static final Logger logger = LoggerFactory.getLogger(ServerClient.class);
    private OkHttpInvoker okHttpInvoker;

    /**
     * 集群地址
     */
    public List<Server> servers = new ArrayList<>();

    private volatile String masterUrl;

    SecureRandom random = new SecureRandom();
    TnExecutors executors = new TnExecutors();

    /**
     * start 要用
     * {InstanceMeta:ServiceMeta}
     */
    public MultiValueMap<String, String> RENEWS = new LinkedMultiValueMap<>();


    /**
     * @param connectString ip + ":" + port
     */
    public ServerClient(List<String> connectString) {
        structure(connectString);
    }

    /**
     * 逗号隔开
     *
     * @param connectString ip + ":" + port
     */
    public ServerClient(String connectString) {
        structure(Arrays.asList(connectString.split(",")));
    }

    private void structure(List<String> connectString) {
        executors.start();
        this.okHttpInvoker = new OkHttpInvoker(5000);
        init(connectString);
    }

    /**
     * init
     *
     * @param connectString ip + ":" + port
     */
    public void init(List<String> connectString) {
        String randomUri = "http://" + connectString.get(new SecureRandom().nextInt(connectString.size()));
        Server leader = leaderSelect(randomUri);
        if (null == leader || leader.getUrl() == null) {
            init(connectString);
        }
        this.masterUrl = leader.getUrl();
    }

    /**
     * 注册服务实例
     *
     * @param service  服务名
     * @param instance 服务实例JSON {@link InstanceMeta 后面改成 properties}
     */
    public void reg(String service, String instance) {
        okHttpInvoker.post(instance, masterUrl + REG_PATH + service);
        logger.debug(" ====> [tregistry]: register instance {} for {} ", instance, service);
        RENEWS.add(instance, service);
    }


    /**
     * 注销服务实例
     *
     * @param service  服务名
     * @param instance 服务实例JSON {@link InstanceMeta 后面改成 properties}
     */
    public void unreg(String service, String instance) {
        okHttpInvoker.post(instance, masterUrl + UNREG_PATH + service);
        logger.debug(" ====> [tregistry]: unregister instance {} for {} ", instance, service);
        RENEWS.remove(instance, service);
    }

    /**
     * 服务实例上报健康状况
     *
     * @param service 服务名
     * @return InstanceMeta 服务实例JSON {@link InstanceMeta 后面改成 properties}
     */
    public Long renew(String service, String instance) {
        logger.debug(" ====> [tregistry]:  up health  instance {} for {} ", instance, service);
        return okHttpInvoker.post(instance, masterUrl + RENEW_PATH + service, Long.class);
    }

    /**
     * 服务实例上报健康状况
     *
     * @param services 服务名（逗号隔开）
     * @return InstanceMeta 服务实例JSON {@link InstanceMeta 后面改成 properties}
     */
    public Long renews(String services, String instance) {
        logger.debug(" ====> [tregistry]:  up health's  instance {} for services {} ", instance, services);
        return okHttpInvoker.post(instance, masterUrl + RENEWS_PATH + services, Long.class);
    }


    // -------------------------------------------------------

    /**
     * 获取所有服务实例
     *
     * @param service 服务名
     * @return InstanceMeta
     */
    public List<InstanceMeta> findAll(String service) {
        logger.debug(" ====> [tregistry]: find all  instance for {} ", service);
        return okHttpInvoker.get(clusterRandomUrl() + FINDALL_PATH + service,
                new TypeReference<>() {
                });
    }

    /**
     * 服务实例版本
     *
     * @param service 服务名
     * @return version
     */
    public Long version(String service) {
        logger.debug(" ====> [tregistry]: find version  instance for {} ", service);
        return okHttpInvoker.get(clusterRandomUrl() + VERSION_PATH + service, Long.class);
    }


    /**
     * 当前集群节点信息
     *
     * @return Server
     */
    public Server info() {
        logger.debug(" ====> [tregistry]:  find info ");
        return okHttpInvoker.get(clusterRandomUrl() + INFO_PATH, new TypeReference<>() {
        });
    }

    /**
     * 所有集群节点信息
     *
     * @return Server
     */
    public List<Server> cluster() {
        List<Server> clusters = okHttpInvoker.get(clusterRandomUrl() + CLUSTER_PATH, new TypeReference<>() {
        });
        logger.debug(" ====> [tregistry]:  find cluster {} ", clusters);
        return clusters.stream().filter(Server::isStatus).toList();
    }

    /**
     * 所有集群节点信息
     *
     * @param url 注册中心地址 {http:ip:prot}
     * @return Server
     */
    private List<Server> cluster(String url) {
        try {
            List<Server> clusters = okHttpInvoker.get(url + CLUSTER_PATH, new TypeReference<>() {
            });
            logger.debug(" ====> [tregistry]:  find cluster {} ", clusters);
            return clusters.stream().filter(Server::isStatus).toList();
        } catch (Exception e) {
            log.error("===> leaderSelect  leader error {}", e.getMessage());
        }
        return new ArrayList<>();
    }

    /**
     * 获取当前集群的 leader
     * 存在循环操作，可能会出现问题
     *
     * @return Server
     */
    public Server leader() {
        String randomUrl = clusterRandomUrl();
        Server result = null;
        try {
            result = okHttpInvoker.get(randomUrl + LEADER_PATH, new TypeReference<>() {
            });
            if (result == null || !result.isStatus()) {
                leader();
            }
        } catch (Exception e) {
            log.error(" ====> [tregistry]:  find leader error {}", e.getMessage());
            leader();
        }
        logger.debug(" ====> [tregistry]:  find leader {} ", result);
        return result;
    }

    /**
     * 获取当前集群的 leader
     *
     * @param url 注册中心地址 {http:ip:prot}
     * @return Server
     */
    private Server leader(String url) {
        Server result = okHttpInvoker.get(url + LEADER_PATH, new TypeReference<>() {
        });
        logger.debug(" ====> [tregistry]:  find leader {} ", result);
        return result;
    }


    /**
     * 获取集群 snapshot
     *
     * @return Snapshot
     */
    public Snapshot snapshot() {
        Snapshot result = okHttpInvoker.get(clusterRandomUrl() + SNAPSHOT_PATH, new TypeReference<>() {
        });
        logger.debug(" ====> [tregistry]:  find snapshot {} ", result);
        return JSON.to(Snapshot.class, result);
    }


    /**
     * start  schedule
     */
    public void start() {
        // 上报健康（线程不会等待上一个任务的完成
        executors.providerCheck(() -> {
            try {
                RENEWS.keySet().forEach(
                        instance -> {
                            StringBuffer sb = new StringBuffer();
                            for (String service : RENEWS.get(instance)) {
                                sb.append(service).append(",");
                            }
                            String services = sb.toString();
                            if (services.endsWith(",")) {
                                services = services.substring(0, services.length() - 1);
                            }
                            Long timestamp = renews(services, instance);
                            log.debug(" ====>>>> [tregistry] : renew instance {} for {} at {}", instance, services, timestamp);
                        }
                );
            } catch (Exception e) {
                log.error(" ====>>>> [tregistry] : renew error  : {}", e.getMessage());
            }
        });

        // client端对 rc servers url 探活 （等待上一个结束之后再继续,)
        executors.servicesUrlCheck(() -> {
            try {
                log.debug("====>>>> [tregistry] : servers update starting : {}", servers);
                if (!servers.isEmpty()) {
                    List<String> connectString = servers.stream()
                            .map(url -> url.getUrl().replace("http://", "")).toList();
                    log.debug(" ====>>>> [tregistry] : servers update {} ", connectString);
                    init(connectString);
                }
            } catch (Exception e) {
                log.error("====>>>> [tregistry] : servers update error : {}", e.getMessage());
            }
        });
    }

    /**
     * 所有线程结束工作
     */
    public void stop() {
        log.info(" ====>>>> [tregistry] : stop with server : {}", servers);
        executors.stop();
    }


    /**
     * 订阅client服务实例上下限
     * {@link <a href="https://juejin.cn/post/7202445722972635193">...</a>}
     */
    public void subscribe(String service, Runnable changeListener) {
        logger.debug("===> subscribe  staring : {}", service);
        SubEventSourceListener listener = new SubEventSourceListener(changeListener, this);
        if (masterUrl == null) {
            Server leader = leader();
            if (leader.isStatus()) {
                masterUrl = leader.getUrl();
            }
        }
        okHttpInvoker.sse(masterUrl + SUBSCRIBE_PATH + service, listener);
    }

    /**
     * 查主的 url
     *
     * @return master
     */
    private Server leaderSelect(String url) {
        Server leader = null;
        logger.debug("===> select leader for url {}", url);
        List<Server> cluster = cluster(url);
        for (Server server : cluster) {
            this.servers.add(server);
            if (server.isLeader()) {
                leader = server;
            }
        }
        logger.debug("===> leaderSelect  leader : {}, servers: {}", leader, servers);
        return leader;
    }

    /**
     * 随机获取一个集群
     *
     * @return master
     */
    private String clusterRandomUrl() {
        return this.servers.get(random.nextInt(this.servers.size())).getUrl();
    }


}
